package handle

import (
	"context"
	"github.com/jinzhu/copier"
	"redis-check/client"
	"redis-check/common"
	"redis-check/tool"
	"sync"
)

type RedisScanner struct {
	Host           client.RedisHost
	BatchCount     int
	MatchPattern   string
	KeyList        []string
	HandleThreads  int
	Qps            int
	physicalDBList []string
	logicalDBMap   map[int32]int64
}

type ScanState struct {
	Context *context.Context
	CurrDb  int32
	DbKey   int64
	Run     bool
}

func (p *RedisScanner) StartScanRedis(callback func(*ScanState), provider func(db int32, keyQueue chan<- []*common.Key), handler func(*client.RedisClient, int32, []*common.Key)) {
	p.logicalDBMap, p.physicalDBList = tool.FetchRedisDbInfo(p.Host)
	for db, key := range p.logicalDBMap {
		p._scanLogicalDB(db, key, callback, provider, handler)
	} // for db, keyNum := range dbNums
}

func (p *RedisScanner) _scanLogicalDB(currentDB int32, dbKey int64, callback func(*ScanState), provider func(db int32, keyQueue chan<- []*common.Key), handler func(*client.RedisClient, int32, []*common.Key)) {
	ctxStat, cancelStat := context.WithCancel(context.Background()) // 主动cancel
	defer cancelStat()
	defer callback(&ScanState{Context: &ctxStat, CurrDb: currentDB, DbKey: dbKey, Run: false})
	common.Logger.Infof("开始比较DB：%d", currentDB)
	callback(&ScanState{Context: &ctxStat, CurrDb: currentDB, DbKey: dbKey, Run: true})
	keyQueue := make(chan []*common.Key, 100)
	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		defer wg.Done()
		provider(currentDB, keyQueue)
	}()
	wg.Add(p.HandleThreads)
	for i := 0; i < p.HandleThreads; i++ {
		go func() {
			defer wg.Done()
			p._handleKeyInfo(currentDB, keyQueue, handler)
		}()
	}
	wg.Wait()
}

func (p *RedisScanner) ScanPhysicalDB(db int32, keyQueue chan<- []*common.Key) {
	for index := 0; index < len(p.physicalDBList); index++ {
		var err error
		var redisClient client.RedisClient
		if p.Host.IsCluster() {
			var singleHost client.RedisHost
			copier.Copy(&singleHost, &p.Host)
			singleHost.Addr = []string{singleHost.Addr[index]}
			singleHost.DBType = common.TypeDB
			if redisClient, err = client.NewRedisClient(singleHost, db); err != nil {
				panic(common.Logger.Critical(err))
			}
		} else {
			if redisClient, err = client.NewRedisClient(p.Host, db); err != nil {
				panic(common.Logger.Errorf("create redis client with host[%v] db[%v] error[%v]",
					p.Host, db, err))
			}
		}
		defer redisClient.Close()
		common.Logger.Infof("start preview redis[%s] db[%d]", redisClient.String(), db)
		handler := func(keysInfo []*common.Key) (interface{}, error) {
			keyQueue <- keysInfo
			return true, nil
		}
		opts := client.ScanOpts{Count: p.BatchCount, Match: p.MatchPattern, Index: index, PhysicalDB: p.physicalDBList[index], KeyList: p.KeyList}
		_, err = redisClient.ScanKeys(handler, opts)
		if err != nil {
			panic(common.Logger.Errorf("scan redis key error: %v", err))
		}
		redisClient.Close()
	} // end fo for idx := 0; idx < p.PhysicalDBList; idx++
	close(keyQueue)
}

func (p *RedisScanner) _handleKeyInfo(db int32, keyQueue <-chan []*common.Key, handler func(*client.RedisClient, int32, []*common.Key)) {
	redisClient, err := client.NewRedisClient(p.Host, db)
	if err != nil {
		panic(common.Logger.Errorf("创建Redis连接失败，地址：%v，DB：%v，异常信息：%v", p.Host, db, err))
	}
	defer redisClient.Close()
	qos := common.StartQoS(p.Qps)
	defer qos.Close()
	for keyInfo := range keyQueue {
		<-qos.Bucket
		handler(&redisClient, db, keyInfo)
	}
}
